/*
*	Copyright(c) 2020 lutianming email：641471957@qq.com
*
*	Sheeps may be copied only under the terms of the GNU Affero General Public License v3.0
*/


#include "framework.h"
#include "AgentProtocol.h"
#include "AgentProtocolSub.h"
#include "AgentManager.h"
#include "ServerProtocol.h"
#ifdef __WINDOWS__
#include <iphlpapi.h>
#include <io.h>
#pragma comment(lib, "pdh.lib")
#pragma comment(lib, "IPHLPAPI.lib")
#else
#include <dirent.h>
#endif // __WINDOWS__

int				AgentSyncFile = 1;
AgentProtocol*	AgentProto = NULL;
char			AgentManagerIP[40] = { 0x0 };
unsigned short	AgentManagerPort = 0;
uint8_t			AgentProjectid = 0;
char			AgentGroupid[32] = {0x0};
cJSON*			AgentReportRootObject = NULL;
cJSON*			AgentReportRootArray = NULL;

int						AgentStaticIP_SIZE = 0;
std::vector<char*>	AgentStaticIP;

//char			AgentLocalIP[40] = { 0x0 };
unsigned long long	AgentBandWidth = 0;

std::map<int, TaskReportLogInfo*> ReportLogInfos;

static void agent_proto_timer(HTIMER time, BaseWorker* proto, void* user_data) {
	AgentProto->Loop();
}

int agent_virtual_ip_load() {
	char buf[40] = { 0x0 };
	int len;
	char* ip;
	FILE* fhandl = NULL;

	std::vector<char*>::iterator iter;
	for (iter = AgentStaticIP.begin(); iter != AgentStaticIP.end(); iter++) {
		free(*iter);
	}
	AgentStaticIP.clear();

	char file[256] = { 0x0 };
	snprintf(file, sizeof(file), "%sips.txt", EXE_Path);

#ifdef __WINDOWS__
	int ret = fopen_s(&fhandl, file, "r");
#else
	fhandl = fopen(file, "r");
#endif // __WINDOWS__
	if (fhandl == NULL) return 0;
	while (true){
		if (fgets(buf, sizeof(buf), fhandl) == NULL) {
			break;
		}
		len = string_trim(buf, int(strlen(buf)));
		if (len) {
			ip = (char*)malloc(len + 1);
			if (ip) {
				memcpy(ip, buf, len);
				*(ip + len) = 0x0;
				AgentStaticIP.push_back(ip);
			}
		}
	}
	fclose(fhandl);
	return (int)AgentStaticIP.size();
}

bool AgentProtocolInit() {
	if (AgentManagerPort != 0) {
		ClientLogInit(ConfigFile);
		if (AgentProto == NULL) AgentProto = new(std::nothrow) AgentProtocol();
		if (AgentProto) {
			AgentSyncFile = config_get_int_value("agent", "syncfile", 0);
			int virtual_ip_count = agent_virtual_ip_load();
			int virtual_ip = config_get_int_value("agent", "static_ip", 0);
			if (virtual_ip) AgentStaticIP_SIZE = virtual_ip_count;

			AgentProto->auto_release(false);
			AgentProto->thread_set();
			TimerCreate(AgentProto, NULL, 0, 100, agent_proto_timer);
		}
	}
	return true;
}

#ifdef __WINDOWS__
std::string AgentNetAdapterDes;
MEMORYSTATUSEX ms;
PDH_STATUS state;
PDH_HQUERY		m_CpuQuery, m_NetQuery;
PDH_HCOUNTER	m_CpuTotal, m_NetTotal;

inline std::string WString2String(const std::wstring& wstr)
{
	int len = WideCharToMultiByte(CP_UTF8, 0, wstr.c_str(), (int)wstr.size(), NULL, 0, NULL, NULL);
	char* buffer = new char[len + 1];
	memset(buffer, '\0', sizeof(char) * (len + 1));
	WideCharToMultiByte(CP_UTF8, 0, wstr.c_str(), (int)wstr.size(), buffer, len, NULL, NULL);
	std::string result(buffer);
	delete[] buffer;
	return result;
}

static void init_net_adapter() {
	PIP_ADAPTER_ADDRESSES pAddresses = NULL;
	ULONG outBufLen = 0;
	GetAdaptersAddresses(AF_UNSPEC, 0, NULL, pAddresses, &outBufLen);
	pAddresses = (IP_ADAPTER_ADDRESSES*)malloc(outBufLen);
	if (!pAddresses) return;

	DWORD dwRetVal = 0;
	ULONG flags = GAA_FLAG_INCLUDE_PREFIX
		| GAA_FLAG_SKIP_DNS_SERVER
		| GAA_FLAG_SKIP_MULTICAST
		| GAA_FLAG_SKIP_ANYCAST;
	PIP_ADAPTER_ADDRESSES pCurrAddresses = NULL;
	PIP_ADAPTER_UNICAST_ADDRESS pUnicast = NULL;
	bool found = false;
	dwRetVal = GetAdaptersAddresses(AF_UNSPEC, flags, NULL, pAddresses, &outBufLen);
	if (dwRetVal == NO_ERROR) { // If successful, output some information from the data we received
		pCurrAddresses = pAddresses;
		while (pCurrAddresses) {
			if (pCurrAddresses->OperStatus != IfOperStatusUp
				|| pCurrAddresses->IfType == IF_TYPE_SOFTWARE_LOOPBACK){
				//|| strstr(std::wcstombs(pCurrAddresses->FriendlyName).c_str(), "vEthernet (WSL)") != NULL) {
				pCurrAddresses = pCurrAddresses->Next;
				continue;
			}
			//pUnicast = pCurrAddresses->FirstUnicastAddress;
			//while (pUnicast) {
			//	char IP[40] = { 0x0 };
			//	if (AF_INET == pUnicast->Address.lpSockaddr->sa_family) {
			//		inet_ntop(PF_INET, &((sockaddr_in*)pUnicast->Address.lpSockaddr)->sin_addr, IP, sizeof(IP));
			//	}
			//	else if (AF_INET6 == pUnicast->Address.lpSockaddr->sa_family) {
			//		inet_ntop(PF_INET6, &((sockaddr_in6*)pUnicast->Address.lpSockaddr)->sin6_addr, IP, sizeof(IP));
			//	}
			//	if (strcmp(IP, AgentLocalIP) == 0) {
			//		found = true;
			//		break;
			//	}
			//	pUnicast = pUnicast->Next;
			//}
			//if (found) {
				AgentBandWidth = pCurrAddresses->TransmitLinkSpeed/1024/1024;
				std::wstring adapter_description = pCurrAddresses->Description;
				AgentNetAdapterDes = WString2String(adapter_description);
				//LOG(clogId, LOG_DEBUG, "adapter: %d:%s\r\n", AgentBandWidth, AgentNetAdapterDes.c_str());
				break;
			//}
			/*pCurrAddresses = pCurrAddresses->Next;*/
		}
	}
	if (pAddresses) free(pAddresses);

	state = PdhOpenQueryA(NULL, NULL, &m_NetQuery);
	char pdh_counter[256] = { 0x0 };
	snprintf(pdh_counter, sizeof(pdh_counter), "\\Network Interface(%s)\\Bytes Total/sec", AgentNetAdapterDes.c_str());
	//state = PdhAddCounterA(m_NetQuery, "\\Network Interface(Realtek PCIe GbE Family Controller)\\Bytes Total/sec", NULL, &m_NetTotal);
	state = PdhAddCounterA(m_NetQuery, pdh_counter, NULL, &m_NetTotal);
	state = PdhCollectQueryData(m_NetQuery);
}

static void remove_counter() {
	PdhRemoveCounter(m_NetTotal);
}
#else
typedef struct CPUPACKED         //定义一个cpu occupy的结构体  
{  
    char name[20];      //定义一个char类型的数组名name有20个元素  
    unsigned int user; //定义一个无符号的int类型的user  
    unsigned int nice; //定义一个无符号的int类型的nice  
    unsigned int system;//定义一个无符号的int类型的system  
    unsigned int idle; //定义一个无符号的int类型的idle  
    unsigned int lowait;  
    unsigned int irq;  
    unsigned int softirq;  
}CPU_OCCUPY;

CPU_OCCUPY cpu_stat1;  
CPU_OCCUPY cpu_stat2;

static int get_cpuoccupy(CPU_OCCUPY *cpust){  
    FILE *fd;  
    char buff[256];  
    fd = fopen("/proc/stat", "r");  
    fgets(buff, sizeof(buff), fd);  
    sscanf(buff, "%s %u %u %u %u %u %u %u", cpust->name, &cpust->user, &cpust->nice, &cpust->system, &cpust->idle, &cpust->lowait, &cpust->irq, &cpust->softirq);  
    fclose(fd);  
    return 0;  
}

static int cal_cpuoccupy(CPU_OCCUPY *o, CPU_OCCUPY *n)  {
    unsigned long od, nd;  
    double cpu_use = 0;  
      
    od = (unsigned long)(o->user + o->nice + o->system + o->idle + o->lowait + o->irq + o->softirq);//第一次(用户+优先级+系统+空闲)的时间再赋给od  
    nd = (unsigned long)(n->user + n->nice + n->system + n->idle + n->lowait + n->irq + n->softirq);//第二次(用户+优先级+系统+空闲)的时间再赋给od  
    double sum = nd - od;  
    //double idle = n->idle - o->idle;  
    //cpu_idle= idle / sum;  
    double user = n->user + n->system + n->nice - o->user - o->system - o->nice;
    cpu_use = user * 100 / sum;  
    return cpu_use;
}  

typedef struct{
    uint64_t recvBytes;
    uint64_t sendBytes;
}NetFlowInfo;

char net_adapter_name[16] = {0x0};
NetFlowInfo nfi1;
NetFlowInfo nfi2;

static int get_default_net_adapter(){
    const char* cmd = "route |grep default |awk '{print $8}'";
    FILE *fp = popen(cmd, "r");
    if(fp == NULL) {
        return -1;
    }
    int ret = 0;
    char name[16] = {0x0};
    ret = fscanf(fp, "%s", name);
    pclose(fp);
    if (ret == 1){
        strcpy(net_adapter_name, name);
    }
    return 1;
}

#define BAND_CMD "ethtool %s |grep Speed| sed 's/Speed:/ /g'"
static int get_net_bandwidth(){
	char cmd[256] = {0x0};
    snprintf(cmd, sizeof(cmd), BAND_CMD, net_adapter_name);
	FILE *fp = popen(cmd, "r");
    if(fp == NULL) {
        return -1;
    }
    int ret = fscanf(fp,"%llu", &AgentBandWidth);
    pclose(fp);
    return ret;
}

#define CMD "/bin/cat /proc/net/dev |awk \'{ if(NR!=1 && NR!=2) print $0 }\' |grep %s |sed 's/:/ /g'| awk \'{ print $2\"  \"$10}\'"
static int getNetFlowInfo(NetFlowInfo* nfi){
    char cmd[256] = {0x0};
    snprintf(cmd, sizeof(cmd), CMD, net_adapter_name);
    FILE *fp = popen(cmd, "r");
    if(fp == NULL) {
        return -1;
    }
    int ret = fscanf(fp,"%lu%lu", &nfi->recvBytes, &nfi->sendBytes);
    pclose(fp);
    return ret;
}

static int get_net_use(){
	unsigned long long byte_count = (nfi2.sendBytes - nfi1.sendBytes) + (nfi2.recvBytes - nfi1.recvBytes);
	return byte_count;
}

static void init_net_adapter() {
	get_cpuoccupy(&cpu_stat1);
	get_default_net_adapter();
	get_net_bandwidth();
	getNetFlowInfo(&nfi1);
}
static void remove_counter() {}
static int get_machine_memory() {
  const char* virtual_filename = "/proc/meminfo";
  FILE* fd;

  char line[256] = {0};
  fd = fopen(virtual_filename, "r");
  if(fd == NULL) {
    exit(1);
  }
  char vmrss_name[32];
  int machine_memory;
  fgets(line, sizeof(line), fd);
  sscanf(line, "%s %d", vmrss_name, &machine_memory);
  fclose(fd);
  return machine_memory;
}

static int get_machine_memory_use() {
  const char* virtual_filename = "/proc/meminfo";
  FILE* fd;

  char line[256] = {0};
  fd = fopen(virtual_filename, "r");
  if(fd == NULL) {
    exit(1);
  }
  char vmrss_name[32];
  int memory;
  int memory_free;
  fgets(line, sizeof(line), fd);
  sscanf(line, "%s %d", vmrss_name, &memory);
  fgets(line, sizeof(line), fd);
  sscanf(line, "%s %d", vmrss_name, &memory_free);
  fclose(fd);
  return (memory - memory_free)*100/memory;
}
#endif


AgentProtocol::AgentProtocol(){
	AgentReportRootObject = cJSON_CreateObject();
	AgentReportRootArray = cJSON_CreateArray();
	cJSON_AddItemToObject(AgentReportRootObject, "data", AgentReportRootArray);
#ifdef __WINDOWS__
	ms.dwLength = sizeof(ms);
	GlobalMemoryStatusEx(&ms);

	state = PdhOpenQueryA(NULL, NULL, &m_CpuQuery);
	state = PdhAddCounterA(m_CpuQuery, ("\\Processor Information(_Total)\\% Processor Time"), NULL, &m_CpuTotal);
	state = PdhCollectQueryData(m_CpuQuery);
#endif
}


AgentProtocol::~AgentProtocol()
{
}


//固有函数，继承自基类
void AgentProtocol::ConnectionMade(HSOCKET hsock, PROTOCOL protocol)
{
	LOG(clogId, LOG_DEBUG, "stress server connection made: protocol=%d socket=%lld\r\n", protocol, hsock->fd);
	if (!authed) {
		HsocketSend(hsock, "sheeps", 6);
	}
#if defined(OPENSSL_SUPPORT) && defined(SHEEPS_SSL)
	else {
		this->Auth(hsock);
	}
#endif
}

void AgentProtocol::ConnectionFailed(HSOCKET hsock, int err)
{
	LOG(clogId, LOG_FAULT, "stress server connection failed socket = %lld\r\n", hsock->fd);
	this->StressHsocket = NULL;
}

void AgentProtocol::ConnectionClosed(HSOCKET hsock, int err)
{
	LOG(clogId, LOG_FAULT, "stress server connection closed socket = %lld\r\n", hsock->fd);
	this->StressHsocket = NULL;
	TaskManagerRuning = false;
	authed = false;

	remove_counter();
}

void AgentProtocol::ConnectionRecved(HSOCKET hsock, const char* data, int len)
{
	if (authed) {
		int packlen = 0;
		while (len > 0) {
			packlen = this->CheckRequest(hsock, data, len);
			if (packlen > 0) {
				len = HsocketPopBuf(hsock, packlen);
			}
			else if (packlen < 0) {
				HsocketClose(hsock);
				break;
			}
			else
				break;
		}
	}
	else {
		if (strncasecmp(data, "sheeps", 6) == 0) {
			authed = true;
			HsocketPopBuf(hsock, 6);
#if defined(OPENSSL_SUPPORT) && defined(SHEEPS_SSL)
			HsocketSSLCreate(hsock, SSL_CLIENT, 1, ca_crt, NULL, NULL);
#else
			this->Auth(hsock);
#endif
		}
	}
}

void AgentProtocol::Auth(HSOCKET hsock) {
	init_net_adapter();

	authtime = NOWTIME;
	long long mem_total = 0;
#ifdef __WINDOWS__
	mem_total = ms.ullTotalPhys/1024;
#else
	mem_total = get_machine_memory();
#endif

	char data[128] = { 0x0 };
	int len = snprintf(data, sizeof(data), "{\"CPU\":%d,\"Mem\":%llu,\"BandWidth\":%lld,\"ProjectID\":%d,\"GroupID\":\"%s\", \"Time\":%lld}", GetCpuCount(), mem_total / 1024, AgentBandWidth, AgentProjectid, AgentGroupid, (long long)authtime);
	//printf("%s:%d %s\n", __func__, __LINE__, data);
	t_stress_protocol_head head = { 0x0 };
	head.msgLen = SHEEPS_HEAD_SIZE + len;
	head.cmdNo = C2S_Agent_Report_Info;
	char buf[128] = { 0x0 };
	memcpy(buf, (char*)&head, SHEEPS_HEAD_SIZE);
	memcpy(buf + SHEEPS_HEAD_SIZE, data, len);
	HsocketSend(hsock, buf, head.msgLen);

	TaskManagerRuning = true;
}

int AgentProtocol::Loop(){
	if (this->StressHsocket == NULL ){
		char ip[40] = { 0x0 };
		GetHostByName(AgentManagerIP, ip, sizeof(ip));
		this->StressHsocket = HsocketConnect(this, ip, AgentManagerPort, TCP_PROTOCOL);
	}
	else if (TaskManagerRuning && this->StressHsocket){
		Heartbeat();
		TaskReport();
		//TaskTeportLog();
		agent_callback();
		task_free_data();
	}
	return 0;
}

void AgentProtocol::TaskReport() {
	static int size = 512, len = 0;
	static char* out = (char*)cJSON_malloc(size);
	int ret = task_push_report_data(AgentReportRootArray);
	if (ret) {
		len = SHEEPS_HEAD_SIZE;
		out = cJSON_PrintBinary(AgentReportRootObject, out, &size, &len);
		t_stress_protocol_head* head = (t_stress_protocol_head*)out;
		head->msgLen = len;
		head->cmdNo = C2S_Agent_Report_Date;
		head->binary = 1;

		HsocketSend(this->StressHsocket, out, len);

		//cJSON_free(out);
		for (int i = 0; i < ret; i++) {
			cJSON_DeleteItemFromArray(AgentReportRootArray, 0);
		}
	}
}

void AgentProtocol::TaskTeportLogFile(TaskReportLogInfo* info) {
	if (info->index == info->files->size()) {
		char data[128] = { 0x0 };
		int len = snprintf(data, sizeof(data), "{\"task_id\":%d}", info->task_id);

		t_stress_protocol_head head = { 0x0 };
		head.msgLen = SHEEPS_HEAD_SIZE + len;
		head.cmdNo = C2S_Task_Report_Log_Done;
		char buf[128] = { 0x0 };
		memcpy(buf, (char*)&head, SHEEPS_HEAD_SIZE);
		memcpy(buf + SHEEPS_HEAD_SIZE, data, len);
		HsocketSend(this->StressHsocket, buf, head.msgLen);

		ReportLogInfos.erase(info->task_id);
		delete info->files;
		free(info);
	}
	else {
		const char* file = (*info->files)[info->index].c_str();
		char path[256] = { 0x0 };
		snprintf(path, sizeof(path), "%s/%s", LogPath, file);
		FILE* hfile = NULL;
		if (info->hfile == NULL) {
#ifdef __WINDOWS__
			fopen_s(&hfile, path, "rb");
#else
			hfile = fopen(path, "rb");
#endif
			info->hfile = hfile;
		}
		else {
			hfile = info->hfile;
		}

		char data[5120] = { 0x0 };
		size_t n = fread(data, sizeof(char), sizeof(data), hfile);
		if (n < sizeof(data)) {
			fclose(hfile);
			info->hfile = NULL;
			info->index++;
		}

		cJSON* root = cJSON_CreateObject();
		cJSON_AddNumberToObject(root, "task_id", info->task_id);
		cJSON_AddStringToObject(root, "file", file);
		cJSON_AddRawToObject(root, "data", data, (int)n);

		int size = 0, len = 0;
		char* out = cJSON_PrintBinary(root, NULL, &size, &len);

		t_stress_protocol_head head = { 0x0 };
		head.msgLen = SHEEPS_HEAD_SIZE + len;
		head.cmdNo = C2S_Task_Report_Log;
		head.binary = 1;
		HsocketSend(this->StressHsocket, (char*)&head, SHEEPS_HEAD_SIZE);
		HsocketSend(this->StressHsocket, out, len);

		cJSON_free(out);
		cJSON_Delete(root);
	}
}

void AgentProtocol::TaskTeportLogInsert(int taskid, int projectid, int macid) {
	TaskReportLogInfo* loginfo = (TaskReportLogInfo*)malloc(sizeof(TaskReportLogInfo));
	if (loginfo) {
		memset(loginfo, 0x0, sizeof(TaskReportLogInfo));
		loginfo->task_id = taskid;
		loginfo->mac_id = macid;
		loginfo->files = new std::vector<std::string>;
		char file[24] = { 0x0 };
		int n = snprintf(file, sizeof(file), "task%d_p%d_m%d", taskid, projectid, macid);
		ReportLogInfos.insert(std::make_pair(taskid, loginfo));

#ifdef __WINDOWS__
		intptr_t hFile = 0;
		struct _finddata_t fileinfo;
		char dir[256] = { 0x0 };
		snprintf(dir, sizeof(dir), "%s/*", LogPath);
		if ((hFile = _findfirst(dir, &fileinfo)) != -1) {
			do {
				if (!(fileinfo.attrib & _A_SUBDIR) && strncmp(file, fileinfo.name, n) == 0) {
					loginfo->files->push_back(fileinfo.name);
				}
			} while (_findnext(hFile, &fileinfo) == 0);
				_findclose(hFile);
		}
#else
		DIR* dir;
		struct    dirent* ptr;
		if ((dir = opendir(LogPath)) != NULL) {
			while ((ptr = readdir(dir)) != NULL) {
				if ((ptr->d_type == DT_REG || ptr->d_type == DT_UNKNOWN) && strncmp(file, ptr->d_name, n) == 0) {
					loginfo->files->push_back(ptr->d_name);
				}
			}
			closedir(dir);
		}
#endif

		this->TaskTeportLogFile(loginfo);
	}
}

void AgentProtocol::Heartbeat(){
#define HEART_BEAT_TIME 5
	if (!check_time_tick(&this->heartbeat, 5)) return;

	int cpu_cast = 0;
	int net_cast = 0;
	char mem_cast = 0;
#ifdef  __WINDOWS__
	PDH_FMT_COUNTERVALUE cpuVal;
	PdhCollectQueryData(m_CpuQuery);
	PdhGetFormattedCounterValue(m_CpuTotal, PDH_FMT_DOUBLE, NULL, &cpuVal);
	cpu_cast = (int)cpuVal.doubleValue;

	PDH_FMT_COUNTERVALUE netVal;
	PdhCollectQueryData(m_NetQuery);
	PdhGetFormattedCounterValue(m_NetTotal, PDH_FMT_DOUBLE, NULL, &netVal);
	net_cast = (int)netVal.doubleValue;

	GlobalMemoryStatusEx(&ms);
	mem_cast = (char)ms.dwMemoryLoad;
#else
	get_cpuoccupy(&cpu_stat2);
	cpu_cast = cal_cpuoccupy(&cpu_stat1, &cpu_stat2);
	cpu_stat1 = cpu_stat2;

	getNetFlowInfo(&nfi2);
	net_cast = get_net_use()/HEART_BEAT_TIME;
	nfi1 = nfi2;

	mem_cast = get_machine_memory_use();
#endif //  __WINDOWS__

	char data[128] = { 0x0 };
	int len = snprintf(data, sizeof(data), "{\"timestamp\":%d,\"cpu\":%d,\"memory\":%d, \"bandwidth\":%d}", this->heartbeat,cpu_cast, mem_cast, net_cast);
	//printf("%s:%d %s\n", __func__, __LINE__, data);

	t_stress_protocol_head head = { 0x0 };
	head.msgLen = SHEEPS_HEAD_SIZE + len;
	head.cmdNo = C2S_Agent_Hertbaet;
	char buf[128] = { 0x0 };
	memcpy(buf, (char*)&head, SHEEPS_HEAD_SIZE);
	memcpy(buf + SHEEPS_HEAD_SIZE, data, len);
	HsocketSend(this->StressHsocket, buf, head.msgLen);
}

//自定义类成员函数
int AgentProtocol::CheckRequest(HSOCKET hsock, const char* data, int len)
{
	if (len < (int)SHEEPS_HEAD_SIZE)
		return 0;
	t_stress_protocol_head head;
	memcpy(&head, data, SHEEPS_HEAD_SIZE);
	if (len < head.msgLen)
		return 0;
	int clen = head.msgLen - SHEEPS_HEAD_SIZE;
	LOG(clogId, LOG_TRACE, "stress client recv [%d %d:%.*s]\r\n", head.msgLen, head.cmdNo, clen, data + SHEEPS_HEAD_SIZE);

	cJSON* root = head.binary ? cJSON_ParseBinary(data + SHEEPS_HEAD_SIZE, clen) : cJSON_ParseByte(data + SHEEPS_HEAD_SIZE, clen);
	if (root == NULL){
		LOG(clogId, LOG_ERROR, "json encoding error\r\n");
		return head.msgLen;;
	}
	do_client_func_by_cmd(hsock, head.cmdNo, root);
	cJSON_Delete(root);
	return head.msgLen;
}
